// Copyright (C) Kumo inc. and its affiliates.
// Author: Jeff.li lijippy@163.com
// All rights reserved.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published
// by the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program.  If not, see <https://www.gnu.org/licenses/>.
//


#include <pollux/dwio/common/flat_map_helper.h>
#include <pollux/common/exception/exceptions.h>
#include <pollux/vector/dictionary_vector.h>

namespace kumo::pollux::dwio::common::flatmap {
    namespace detail {
        void reset(VectorPtr &vector, vector_size_t size, bool hasNulls) {
            if (!vector) {
                return;
            }

            if (vector.use_count() > 1) {
                vector.reset();
                return;
            }

            if (hasNulls != vector->may_have_nulls()) {
                if (hasNulls) {
                    vector->mutable_nulls(size);
                } else {
                    vector->reset_nulls();
                }
            }
            vector->resize(size);
        }

        void initializeStringVector(
            VectorPtr &vector,
            const TypePtr &type,
            memory::MemoryPool &pool,
            const std::vector<const BaseVector *> &vectors) {
            vector_size_t size = 0;
            bool hasNulls = false;
            std::vector<BufferPtr> buffers;
            for (auto vec: vectors) {
                size += vec->size();
                hasNulls = hasNulls || vec->may_have_nulls();
                auto flat_vector =
                        dynamic_cast<const FlatVector<StringView> *>(vec->wrapped_vector());
                if (flat_vector) {
                    for (auto &buffer: flat_vector->stringBuffers()) {
                        buffers.push_back(buffer);
                    }
                } else {
                    DWIO_ENSURE(
                        vec->is_constant_encoding() && vec->may_have_nulls(),
                        "Unsupported encoding");
                }
            }
            initializeFlatVector<StringView>(
                vector, pool, type, size, hasNulls, std::move(buffers));
        }
    } // namespace detail

    // initialize flat vector that can fit all the vectors appended
    template<TypeKind K>
    void initializeFlatVector(
        VectorPtr &vector,
        memory::MemoryPool &pool,
        const TypePtr &type,
        const std::vector<const BaseVector *> &vectors) {
        using NativeType = typename pollux::TypeTraits<K>::NativeType;
        vector_size_t size = 0;
        bool hasNulls = false;
        for (auto vec: vectors) {
            if (!dynamic_cast<const FlatVector<NativeType> *>(vec)) {
                DWIO_ENSURE(
                    vec->is_constant_encoding() && vec->may_have_nulls(),
                    "Unsupported encoding");
            }
            size += vec->size();
            hasNulls = hasNulls || vec->may_have_nulls();
        }
        initializeFlatVector<NativeType>(vector, pool, type, size, hasNulls);
    }

    template<TypeKind K>
    void initializeVectorImpl(
        VectorPtr &vector,
        const TypePtr &type,
        memory::MemoryPool &pool,
        const std::vector<const BaseVector *> &vectors) {
        initializeFlatVector<K>(vector, pool, type, vectors);
    }

    template<>
    void initializeVectorImpl<TypeKind::VARCHAR>(
        VectorPtr &vector,
        const TypePtr &type,
        memory::MemoryPool &pool,
        const std::vector<const BaseVector *> &vectors) {
        detail::initializeStringVector(vector, type, pool, vectors);
    }

    template<>
    void initializeVectorImpl<TypeKind::VARBINARY>(
        VectorPtr &vector,
        const TypePtr &type,
        memory::MemoryPool &pool,
        const std::vector<const BaseVector *> &vectors) {
        detail::initializeStringVector(vector, type, pool, vectors);
    }

    namespace {
        void addVector(
            std::vector<const BaseVector *> &vectors,
            const BaseVector *vector) {
            if (vector && vector->size()) {
                vectors.push_back(vector);
            }
        }
    } // namespace

    template<>
    void initializeVectorImpl<TypeKind::ARRAY>(
        VectorPtr &vector,
        const TypePtr &type,
        memory::MemoryPool &pool,
        const std::vector<const BaseVector *> &vectors) {
        vector_size_t size = 0;
        bool hasNulls = false;
        std::vector<const BaseVector *> elements;
        std::vector<pollux::VectorPtr> dummyElements;
        for (auto vec: vectors) {
            size += vec->size();
            hasNulls = hasNulls || vec->may_have_nulls();
            if (auto array = dynamic_cast<const ArrayVector *>(vec)) {
                addVector(elements, array->elements().get());
            } else if (
                auto dict =
                        dynamic_cast<const DictionaryVector<pollux::ComplexType> *>(vec)) {
                // Now add new dummy vectors in elements with the size of the
                // dictionary vectors.
                // ASSUMPTION: that there are no nesting of dictionary encoding.
                auto dummyElement = BaseVector::create_null_constant(
                    type->as_array().elementType(), dict->size(), &pool);
                addVector(elements, dummyElement.get());
                // Keep the dummy element vectors alive.
                dummyElements.emplace_back(std::move(dummyElement));
            } else {
                DWIO_ENSURE(
                    vec->is_constant_encoding() && vec->may_have_nulls(),
                    "Unsupported encoding");
            }
        }

        detail::reset(vector, size, hasNulls);
        VectorPtr origElementsVector;
        if (vector) {
            auto &array_vector = dynamic_cast<ArrayVector &>(*vector);
            origElementsVector = array_vector.elements();
            detail::resetIfNotWritable(
                vector,
                array_vector.nulls(),
                array_vector.offsets(),
                array_vector.sizes());
        }

        if (!vector) {
            auto arrayType =
                    origElementsVector ? ARRAY(origElementsVector->type()) : type;
            vector = std::make_shared<ArrayVector>(
                &pool,
                arrayType,
                hasNulls ? AlignedBuffer::allocate<bool>(size, &pool) : nullptr,
                0 /* length */,
                AlignedBuffer::allocate<vector_size_t>(size, &pool),
                AlignedBuffer::allocate<vector_size_t>(size, &pool),
                origElementsVector,
                0 /* nullCount */);
        }
        origElementsVector.reset();

        if (elements.size() > 0) {
            initializeVector(
                vector->as<ArrayVector>()->elements(),
                type->as_array().elementType(),
                pool,
                elements);
        }
    }

    void initializeMapVector(
        VectorPtr &vector,
        const TypePtr &type,
        memory::MemoryPool &pool,
        const std::vector<const BaseVector *> &vectors,
        std::optional<vector_size_t> sizeOverride) {
        vector_size_t size = 0;
        bool hasNulls = false;
        std::vector<const BaseVector *> keys;
        std::vector<const BaseVector *> values;
        for (auto vec: vectors) {
            POLLUX_CHECK_NOT_NULL(vec);
            size += vec->size();
            hasNulls = hasNulls || vec->may_have_nulls();
            if (auto map = dynamic_cast<const MapVector *>(vec)) {
                addVector(keys, map->mapKeys().get());
                addVector(values, map->mapValues().get());
            } else {
                DWIO_ENSURE(
                    vec->is_constant_encoding() && vec->may_have_nulls(),
                    "Unsupported encoding");
            }
        }

        if (sizeOverride.has_value()) {
            size = sizeOverride.value();
        }

        detail::reset(vector, size, hasNulls);
        VectorPtr origKeysVector;
        VectorPtr origValuesVector;
        if (vector) {
            auto &map_vector = dynamic_cast<MapVector &>(*vector);
            origKeysVector = map_vector.mapKeys();
            origValuesVector = map_vector.mapValues();
            detail::resetIfNotWritable(
                vector, map_vector.nulls(), map_vector.offsets(), map_vector.sizes());
        }

        if (!vector) {
            // When read-string-as-row flag is on, string readers produce ROW(BIGINT,
            // BIGINT) type instead of VARCHAR or VARBINARY. In these cases, type_->type
            // is not the right type of the final vector.
            auto resultMapType =
                    (origKeysVector == nullptr || origValuesVector == nullptr)
                        ? type
                        : MAP(origKeysVector->type(), origValuesVector->type());

            vector = std::make_shared<MapVector>(
                &pool,
                resultMapType,
                hasNulls ? AlignedBuffer::allocate<bool>(size, &pool) : nullptr,
                0 /* length */,
                AlignedBuffer::allocate<vector_size_t>(size, &pool),
                AlignedBuffer::allocate<vector_size_t>(size, &pool),
                origKeysVector,
                origValuesVector,
                0 /* nullCount */);
        }
        origKeysVector.reset();
        origValuesVector.reset();

        auto &mapType = type->as_map();
        auto &map_vector = dynamic_cast<MapVector &>(*vector);
        if (keys.size() > 0) {
            initializeVector(map_vector.mapKeys(), mapType.keyType(), pool, keys);
        }
        if (values.size() > 0) {
            initializeVector(map_vector.mapValues(), mapType.valueType(), pool, values);
        }
    }

    template<>
    void initializeVectorImpl<TypeKind::MAP>(
        VectorPtr &vector,
        const TypePtr &type,
        memory::MemoryPool &pool,
        const std::vector<const BaseVector *> &vectors) {
        initializeMapVector(vector, type, pool, vectors);
    }

    template<>
    void initializeVectorImpl<TypeKind::ROW>(
        VectorPtr &vector,
        const TypePtr &type,
        memory::MemoryPool &pool,
        const std::vector<const BaseVector *> &vectors) {
        vector_size_t size = 0;
        bool hasNulls = false;
        auto &rowType = type->as_row();
        std::vector<std::vector<const BaseVector *> > fields{rowType.size()};
        for (auto vec: vectors) {
            size += vec->size();
            hasNulls = hasNulls || vec->may_have_nulls();
            auto &row = dynamic_cast<const RowVector &>(*vec);
            for (size_t col = 0; col < rowType.size(); ++col) {
                fields.at(col).push_back(row.childAt(col).get());
            }
        }

        detail::reset(vector, size, hasNulls);
        std::vector<VectorPtr> origChildren;
        if (vector) {
            auto &row_vector = dynamic_cast<RowVector &>(*vector);
            origChildren = row_vector.children();
            detail::resetIfNotWritable(vector, vector->nulls());
        } else {
            origChildren.resize(rowType.size());
        }

        if (!vector) {
            // When read-string-as-row flag is on, string readers produce ROW(BIGINT,
            // BIGINT) type instead of VARCHAR or VARBINARY. In these cases, type_->type
            // is not the right type of the final struct.
            std::vector<TypePtr> types;
            types.reserve(origChildren.size());
            for (auto i = 0; i < origChildren.size(); i++) {
                const auto &child = origChildren[i];
                if (child) {
                    types.emplace_back(child->type());
                } else {
                    types.emplace_back(type->childAt(i));
                }
            }

            vector = std::make_shared<RowVector>(
                &pool,
                ROW(std::move(types)),
                hasNulls ? AlignedBuffer::allocate<bool>(size, &pool, true) : nullptr,
                0 /* length */,
                origChildren,
                0 /* nullCount */);
        }
        origChildren.clear();

        auto &row_vector = dynamic_cast<RowVector &>(*vector);
        for (size_t col = 0; col < rowType.size(); ++col) {
            initializeVector(
                row_vector.childAt(col), rowType.childAt(col), pool, fields.at(col));
        }
    }

    void initializeVector(
        VectorPtr &vector,
        const TypePtr &type,
        memory::MemoryPool &pool,
        const std::vector<const BaseVector *> &vectors) {
        POLLUX_DYNAMIC_TYPE_DISPATCH(
            initializeVectorImpl, type->kind(), vector, type, pool, vectors);
    }

    // copy nulls from source to target, return number of nulls copied
    vector_size_t copy_nulls(
        BaseVector &target,
        vector_size_t targetIndex,
        const BaseVector &source,
        vector_size_t sourceIndex,
        vector_size_t count) {
        vector_size_t nulls = 0;
        // it's assumed that initVector is called before calling this method to
        // properly allocate/clear nulls buffer. So we only need to check against
        // target vector here.
        target.resize(targetIndex + count, false);
        if (target.may_have_nulls()) {
            auto tgtNulls = const_cast<uint64_t *>(target.raw_nulls());
            if (source.is_constant_encoding()) {
                for (vector_size_t i = 0; i < count; ++i) {
                    bits::set_null(tgtNulls, targetIndex + i);
                }
                nulls += count;
            } else if (auto srcNulls = source.raw_nulls()) {
                for (vector_size_t i = 0; i < count; ++i) {
                    if (bits::isBitNull(srcNulls, sourceIndex + i)) {
                        ++nulls;
                        bits::set_null(tgtNulls, targetIndex + i);
                    } else {
                        bits::clearNull(tgtNulls, targetIndex + i);
                    }
                }
            } else {
                for (vector_size_t i = 0; i < count; ++i) {
                    bits::clearNull(tgtNulls, targetIndex + i);
                }
            }
        }
        return nulls;
    }

    template<TypeKind K>
    void copyImpl(
        const TypePtr & /* type */,
        BaseVector &target,
        vector_size_t targetIndex,
        const BaseVector &source,
        vector_size_t sourceIndex,
        vector_size_t count) {
        // copy values if not all are nulls
        if (copy_nulls(target, targetIndex, source, sourceIndex, count) != count) {
            using NativeType = typename pollux::TypeTraits<K>::NativeType;
            auto &tgt = static_cast<FlatVector<NativeType> &>(target);
            auto &src = static_cast<const FlatVector<NativeType> &>(source);
            std::copy(
                src.rawValues() + sourceIndex,
                src.rawValues() + sourceIndex + count,
                const_cast<NativeType *>(tgt.rawValues()) + targetIndex);
        }
    }

    template<>
    void copyImpl<TypeKind::BOOLEAN>(
        const TypePtr & /* type */,
        BaseVector &target,
        vector_size_t targetIndex,
        const BaseVector &source,
        vector_size_t sourceIndex,
        vector_size_t count) {
        // copy values if not all are nulls
        if (copy_nulls(target, targetIndex, source, sourceIndex, count) != count) {
            auto &tgt = static_cast<FlatVector<bool> &>(target);
            auto &src = static_cast<const FlatVector<bool> &>(source);
            bits::copyBits(
                src.rawValues<uint64_t>(),
                sourceIndex,
                const_cast<uint64_t *>(tgt.rawValues<uint64_t>()),
                targetIndex,
                count);
        }
    }

    void copyStrings(
        BaseVector &target,
        vector_size_t targetIndex,
        const BaseVector &source,
        vector_size_t sourceIndex,
        vector_size_t count) {
        // copy values if not all are nulls
        if (copy_nulls(target, targetIndex, source, sourceIndex, count) != count) {
            auto &tgt = static_cast<FlatVector<StringView> &>(target);
            auto &src = static_cast<const FlatVector<StringView> &>(source);
            std::copy(
                src.rawValues() + sourceIndex,
                src.rawValues() + sourceIndex + count,
                const_cast<StringView *>(tgt.rawValues()) + targetIndex);
        }
    }

    template<>
    void copyImpl<TypeKind::VARCHAR>(
        const TypePtr & /* type */,
        BaseVector &target,
        vector_size_t targetIndex,
        const BaseVector &source,
        vector_size_t sourceIndex,
        vector_size_t count) {
        copyStrings(target, targetIndex, source, sourceIndex, count);
    }

    template<>
    void copyImpl<TypeKind::VARBINARY>(
        const TypePtr & /* type */,
        BaseVector &target,
        vector_size_t targetIndex,
        const BaseVector &source,
        vector_size_t sourceIndex,
        vector_size_t count) {
        copyStrings(target, targetIndex, source, sourceIndex, count);
    }

    template<typename T>
    vector_size_t nextChildOffset(T &target, vector_size_t index) {
        if (UNLIKELY(index <= 0)) {
            return 0;
        }

        --index;
        return target.rawOffsets()[index] + target.rawSizes()[index];
    }

    // copy offsets/lengths from source vector to target
    template<typename T>
    vector_size_t copyOffsets(
        T &target,
        vector_size_t targetIndex,
        const T &source,
        vector_size_t sourceIndex,
        vector_size_t count,
        vector_size_t &childOffset) {
        target.resize(targetIndex + count);
        auto tgtOffsets = const_cast<vector_size_t *>(target.rawOffsets());
        auto tgtSizes = const_cast<vector_size_t *>(target.rawSizes());
        auto srcSizes = source.rawSizes();
        childOffset = nextChildOffset(target, targetIndex);
        auto nextChildOffset = childOffset;
        // If there is null, process one at a time with null checks.
        if (copy_nulls(target, targetIndex, source, sourceIndex, count) > 0) {
            auto tgtNulls = target.raw_nulls();
            vector_size_t size;
            for (vector_size_t i = 0; i < count; ++i) {
                auto index = targetIndex + i;
                // We cannot track the last non-null index efficiently across copy
                // invocations. In order to make it easier for computing child offset, we
                // always fill offset/length even if value is null.
                tgtOffsets[index] = nextChildOffset;
                size = bits::isBitNull(tgtNulls, index) ? 0 : srcSizes[sourceIndex + i];
                tgtSizes[index] = size;
                nextChildOffset += size;
            }
        } else {
            std::copy(
                srcSizes + sourceIndex,
                srcSizes + sourceIndex + count,
                tgtSizes + targetIndex);
            for (vector_size_t i = 0; i < count; ++i) {
                auto index = targetIndex + i;
                tgtOffsets[index] = nextChildOffset;
                nextChildOffset += tgtSizes[index];
            }
        }
        return nextChildOffset;
    }

    // Copy offset from source constant vectors. For now this could only be null
    // padded source vector, which are all nulls.
    template<typename T>
    void copyOffsetsFromConstInput(
        T &target,
        vector_size_t targetIndex,
        const BaseVector &source,
        vector_size_t sourceIndex,
        vector_size_t count) {
        POLLUX_DEBUG_ONLY const auto nullCount =
                copy_nulls(target, targetIndex, source, sourceIndex, count);
        POLLUX_DCHECK_EQ(nullCount, count, "Expecting constant null vector input");
        // We cannot track the last non-null index efficiently across copy
        // invocations. In order to make it easier for computing child offset, we
        // always fill offset/length even if value is null.
        auto tgtOffsets = const_cast<vector_size_t *>(target.rawOffsets());
        auto tgtSizes = const_cast<vector_size_t *>(target.rawSizes());
        auto childOffset = nextChildOffset(target, targetIndex);
        for (vector_size_t i = 0; i < count; ++i) {
            auto index = targetIndex + i;
            tgtOffsets[index] = childOffset;
            tgtSizes[index] = 0;
        }
    }

    template<>
    void copyImpl<TypeKind::ARRAY>(
        const TypePtr &type,
        BaseVector &target,
        vector_size_t targetIndex,
        const BaseVector &source,
        vector_size_t sourceIndex,
        vector_size_t count) {
        auto &tgt = static_cast<ArrayVector &>(target);
        // Explicitly deal with constant null vectors.
        if (source.is_constant_encoding()) {
            copyOffsetsFromConstInput(tgt, targetIndex, source, sourceIndex, count);
            return;
        }

        if (source.encoding() == VectorEncoding::Simple::DICTIONARY) {
            auto dictSrc =
                    static_cast<const DictionaryVector<pollux::ComplexType> *>(&source);

            copy_nulls(tgt, targetIndex, *dictSrc, sourceIndex, count);
            // TODO: copying offsets. Do we look up the offsets or do we
            // just materialize one by one? I think copyOne pattern is correct,
            // but we need an efficient way to iterate the nulls.
            auto tgtNulls = const_cast<uint64_t *>(tgt.raw_nulls());
            auto tgtOffsets = const_cast<vector_size_t *>(tgt.rawOffsets());
            auto tgtSizes = const_cast<vector_size_t *>(tgt.rawSizes());
            auto childOffset = nextChildOffset(tgt, targetIndex);
            auto alphabet = dictSrc->value_vector()->as<ArrayVector>();
            for (auto idx = 0; idx < count; ++idx) {
                // auto isNull = copyNull(tgt, targetIndex, source, sourceIndex);
                auto isNull = bits::isBitNull(tgtNulls, sourceIndex + idx);
                if (isNull) {
                    tgtOffsets[targetIndex + idx] = childOffset;
                    tgtSizes[targetIndex + idx] = 0;
                } else {
                    auto wrapped_index = dictSrc->wrapped_index(sourceIndex + idx);
                    // NOTE: it should not make sense for alphabet to have null rows, so
                    // we don't need to do additional null checks or worry about overrides.
                    copyOne(type, tgt, targetIndex + idx, *alphabet, wrapped_index);
                    tgtOffsets[targetIndex + idx] = childOffset;
                    tgtSizes[targetIndex + idx] = alphabet->sizeAt(wrapped_index);
                    childOffset += tgtSizes[targetIndex + idx];
                }
            }
            return;
        }

        auto &src = static_cast<const ArrayVector &>(source);
        vector_size_t childOffset = 0;
        auto nextChildOffset =
                copyOffsets(tgt, targetIndex, src, sourceIndex, count, childOffset);
        auto size = nextChildOffset - childOffset;
        if (size > 0) {
            auto &arrayType = static_cast<const ArrayType &>(*type);
            // NOTE: We assume child values are placed continuously in the source
            // vector, which is the case if it's produced by the column reader
            copy(
                arrayType.elementType(),
                *tgt.elements(),
                childOffset,
                *src.elements(),
                src.rawOffsets()[sourceIndex],
                size);
        }
    }

    template<>
    void copyImpl<TypeKind::MAP>(
        const TypePtr &type,
        BaseVector &target,
        vector_size_t targetIndex,
        const BaseVector &source,
        vector_size_t sourceIndex,
        vector_size_t count) {
        auto &tgt = static_cast<MapVector &>(target);
        // Explicitly deal with constant null vectors.
        if (source.is_constant_encoding()) {
            copyOffsetsFromConstInput(tgt, targetIndex, source, sourceIndex, count);
            return;
        }
        auto &src = static_cast<const MapVector &>(source);
        vector_size_t childOffset = 0;
        auto nextChildOffset =
                copyOffsets(tgt, targetIndex, src, sourceIndex, count, childOffset);
        auto size = nextChildOffset - childOffset;
        if (size > 0) {
            auto &mapType = type->as_map();
            auto srcChildOffset = src.rawOffsets()[sourceIndex];
            // we assume child values are placed continuously in the source vector,
            // which is the case if it's produced by the column reader
            copy(
                mapType.keyType(),
                *tgt.mapKeys(),
                childOffset,
                *src.mapKeys(),
                srcChildOffset,
                size);
            copy(
                mapType.valueType(),
                *tgt.mapValues(),
                childOffset,
                *src.mapValues(),
                srcChildOffset,
                size);
        }
    }

    template<>
    void copyImpl<TypeKind::ROW>(
        const TypePtr &type,
        BaseVector &target,
        vector_size_t targetIndex,
        const BaseVector &source,
        vector_size_t sourceIndex,
        vector_size_t count) {
        if (copy_nulls(target, targetIndex, source, sourceIndex, count) != count) {
            auto &src = static_cast<const RowVector &>(source);
            auto &tgt = static_cast<RowVector &>(target);
            auto &rowType = static_cast<const RowType &>(*type);
            for (size_t i = 0; i < src.childrenSize(); ++i) {
                copy(
                    rowType.childAt(i),
                    *tgt.childAt(i),
                    targetIndex,
                    *src.childAt(i),
                    sourceIndex,
                    count);
            }
        }
    }

    void copy(
        const TypePtr &type,
        BaseVector &target,
        vector_size_t targetIndex,
        const BaseVector &source,
        vector_size_t sourceIndex,
        vector_size_t count) {
        POLLUX_DYNAMIC_TYPE_DISPATCH(
            copyImpl,
            type->kind(),
            type,
            target,
            targetIndex,
            source,
            sourceIndex,
            count);
    }

    // copy null from source to target, return true if the value copied is null
    bool copyNull(
        BaseVector &target,
        vector_size_t targetIndex,
        const BaseVector &source,
        vector_size_t sourceIndex) {
        // it's assumed that initVector is called before calling this method to
        // properly allocate/clear nulls buffer. So we only need to check against
        // target vector here.
        target.resize(targetIndex + 1, false);
        if (target.may_have_nulls()) {
            bool srcIsNull =
            (source.is_constant_encoding() ||
             (source.may_have_nulls() &&
              bits::isBitNull(source.raw_nulls(), sourceIndex)));
            bits::set_null(
                const_cast<uint64_t *>(target.raw_nulls()), targetIndex, srcIsNull);
            return srcIsNull;
        }
        return false;
    }

    template<TypeKind K>
    void copyOneImpl(
        const TypePtr & /* type */,
        BaseVector &target,
        vector_size_t targetIndex,
        const BaseVector &source,
        vector_size_t sourceIndex) {
        // copy value if not null
        if (!copyNull(target, targetIndex, source, sourceIndex)) {
            using NativeType = typename pollux::TypeTraits<K>::NativeType;
            auto &tgt = static_cast<FlatVector<NativeType> &>(target);
            auto &src = static_cast<const FlatVector<NativeType> &>(source);
            const_cast<NativeType *>(tgt.rawValues())[targetIndex] =
                    src.rawValues()[sourceIndex];
        }
    }

    template<>
    void copyOneImpl<TypeKind::BOOLEAN>(
        const TypePtr & /* type */,
        BaseVector &target,
        vector_size_t targetIndex,
        const BaseVector &source,
        vector_size_t sourceIndex) {
        // copy value if not null
        if (!copyNull(target, targetIndex, source, sourceIndex)) {
            auto &tgt = static_cast<FlatVector<bool> &>(target);
            auto &src = static_cast<const FlatVector<bool> &>(source);
            bits::setBit(
                const_cast<uint64_t *>(tgt.rawValues<uint64_t>()),
                targetIndex,
                bits::isBitSet(src.rawValues<uint64_t>(), sourceIndex));
        }
    }

    void copyString(
        BaseVector &target,
        vector_size_t targetIndex,
        const BaseVector &source,
        vector_size_t sourceIndex) {
        // copy value if not null
        if (!copyNull(target, targetIndex, source, sourceIndex)) {
            auto &tgt = static_cast<FlatVector<StringView> &>(target);
            auto &src = static_cast<const FlatVector<StringView> &>(source);
            const_cast<StringView *>(tgt.rawValues())[targetIndex] =
                    src.rawValues()[sourceIndex];
        }
    }

    template<>
    void copyOneImpl<TypeKind::VARCHAR>(
        const TypePtr & /* type */,
        BaseVector &target,
        vector_size_t targetIndex,
        const BaseVector &source,
        vector_size_t sourceIndex) {
        copyString(target, targetIndex, source, sourceIndex);
    }

    template<>
    void copyOneImpl<TypeKind::VARBINARY>(
        const TypePtr & /* type */,
        BaseVector &target,
        vector_size_t targetIndex,
        const BaseVector &source,
        vector_size_t sourceIndex) {
        copyString(target, targetIndex, source, sourceIndex);
    }

    // copy offset from source to target
    template<typename T>
    vector_size_t copyOffset(
        T &target,
        vector_size_t targetIndex,
        const T &source,
        vector_size_t sourceIndex,
        vector_size_t &childOffset) {
        target.resize(targetIndex + 1);
        auto tgtSizes = const_cast<vector_size_t *>(target.rawSizes());
        childOffset = nextChildOffset(target, targetIndex);
        const_cast<vector_size_t *>(target.rawOffsets())[targetIndex] = childOffset;
        // We cannot track the last non-null index efficiently across copy
        // invocations. In order to make it easier for computing child offset, we
        // always fill offset/length even if value is null.
        auto size = copyNull(target, targetIndex, source, sourceIndex)
                        ? 0
                        : source.rawSizes()[sourceIndex];
        tgtSizes[targetIndex] = size;
        return size;
    }

    // Copy offset from source constant vectors. For now this could only be null
    // padded source vector, which are all nulls.
    template<typename T>
    void copyOffsetFromConstInput(
        T &target,
        vector_size_t targetIndex,
        const BaseVector &source,
        vector_size_t sourceIndex) {
        copyNull(target, targetIndex, source, sourceIndex);
        // We cannot track the last non-null index efficiently across copy
        // invocations. In order to make it easier for computing child offset, we
        // always fill offset/length even if value is null.
        const_cast<vector_size_t *>(target.rawOffsets())[targetIndex] =
                nextChildOffset(target, targetIndex);
        const_cast<vector_size_t *>(target.rawSizes())[targetIndex] = 0;
    }

    template<>
    void copyOneImpl<TypeKind::ARRAY>(
        const TypePtr &type,
        BaseVector &target,
        vector_size_t targetIndex,
        const BaseVector &source,
        vector_size_t sourceIndex) {
        auto &tgt = static_cast<ArrayVector &>(target);
        if (source.is_constant_encoding()) {
            // We already checked in initializeVector that constant source
            // vectors are constant null vectors.
            copyOffsetFromConstInput(tgt, targetIndex, source, sourceIndex);
            return;
        }

        if (source.encoding() == VectorEncoding::Simple::DICTIONARY) {
            auto dictSrc =
                    static_cast<const DictionaryVector<pollux::ComplexType> *>(&source);

            auto tgtOffsets = const_cast<vector_size_t *>(tgt.rawOffsets());
            auto tgtSizes = const_cast<vector_size_t *>(tgt.rawSizes());
            auto childOffset = nextChildOffset(tgt, targetIndex);
            auto alphabet = dictSrc->value_vector()->as<ArrayVector>();
            auto wrapped_index = dictSrc->wrapped_index(sourceIndex);

            auto isNull = copyNull(tgt, targetIndex, *dictSrc, sourceIndex);
            if (!isNull) {
                copyOne(type, tgt, targetIndex, *alphabet, wrapped_index);
            }
            tgtOffsets[targetIndex] = childOffset;
            tgtSizes[targetIndex] = isNull ? 0 : alphabet->sizeAt(wrapped_index);
            return;
        }
        auto &src = static_cast<const ArrayVector &>(source);
        vector_size_t childOffset = 0;
        auto size = copyOffset(tgt, targetIndex, src, sourceIndex, childOffset);
        if (size > 0) {
            auto &arrayType = static_cast<const ArrayType &>(*type);
            // we assume child values are placed continuously in the source vector,
            // which is the case if it's produced by the column reader
            copy(
                arrayType.elementType(),
                *tgt.elements(),
                childOffset,
                *src.elements(),
                src.rawOffsets()[sourceIndex],
                size);
        }
    }

    template<>
    void copyOneImpl<TypeKind::MAP>(
        const TypePtr &type,
        BaseVector &target,
        vector_size_t targetIndex,
        const BaseVector &source,
        vector_size_t sourceIndex) {
        auto &tgt = static_cast<MapVector &>(target);
        if (source.is_constant_encoding()) {
            // We already checked in initializeVector that constant source
            // vectors are constant null vectors.
            copyOffsetFromConstInput(tgt, targetIndex, source, sourceIndex);
            return;
        }
        auto &src = static_cast<const MapVector &>(source);
        vector_size_t childOffset = 0;
        auto size = copyOffset(tgt, targetIndex, src, sourceIndex, childOffset);
        if (size > 0) {
            auto &mapType = static_cast<const MapType &>(*type);
            auto srcChildOffset = src.rawOffsets()[sourceIndex];
            // we assume child values are placed continuously in the source vector,
            // which is the case if it's produced by the column reader
            copy(
                mapType.keyType(),
                *tgt.mapKeys(),
                childOffset,
                *src.mapKeys(),
                srcChildOffset,
                size);
            copy(
                mapType.valueType(),
                *tgt.mapValues(),
                childOffset,
                *src.mapValues(),
                srcChildOffset,
                size);
        }
    }

    template<>
    void copyOneImpl<TypeKind::ROW>(
        const TypePtr &type,
        BaseVector &target,
        vector_size_t targetIndex,
        const BaseVector &source,
        vector_size_t sourceIndex) {
        // copy value if not null
        if (!copyNull(target, targetIndex, source, sourceIndex)) {
            auto &src = static_cast<const RowVector &>(source);
            auto &tgt = static_cast<RowVector &>(target);
            auto &rowType = static_cast<const RowType &>(*type);
            for (size_t i = 0; i < src.childrenSize(); ++i) {
                copyOne(
                    rowType.childAt(i),
                    *tgt.childAt(i),
                    targetIndex,
                    *src.childAt(i),
                    sourceIndex);
            }
        }
    }

    void copyOne(
        const TypePtr &type,
        BaseVector &target,
        vector_size_t targetIndex,
        const BaseVector &source,
        vector_size_t sourceIndex) {
        POLLUX_DYNAMIC_TYPE_DISPATCH(
            copyOneImpl,
            type->kind(),
            type,
            target,
            targetIndex,
            source,
            sourceIndex);
    }
} // namespace kumo::pollux::dwio::common::flatmap
